NYC Trips Analysis

Alexandre Magno - 12/06/2020

In [4]:
from IPython.display import HTML

HTML('''<script>
code_show=true; 
function code_toggle() {
 if (code_show){
 $('div.input').hide();
 } else {
 $('div.input').show();
 }
 code_show = !code_show
} 
$( document ).ready(code_toggle);
</script>
''')
Out[4]:
In [62]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, to_timestamp, date_format
from pyspark.sql.types import LongType, IntegerType, FloatType
from pyspark.sql.functions import udf
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
from scipy.stats import norm
from statsmodels.tsa.seasonal import seasonal_decompose

%matplotlib inline
sns.set(font_scale = 1.2)

MAX_MEMORY = "3g"

spark = SparkSession \
    .builder \
    .appName("NYC Trips Analysis") \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .master('local[*]') \
    .getOrCreate()
In [2]:
# Defining the path to all files being used

trips_path = [
    "data/data-sample_data-nyctaxi-trips-2009-json_corrigido.json",
    "data/data-sample_data-nyctaxi-trips-2010-json_corrigido.json",
    "data/data-sample_data-nyctaxi-trips-2011-json_corrigido.json",
    "data/data-sample_data-nyctaxi-trips-2012-json_corrigido.json"
]

vendor_path = "data/data-vendor_lookup-csv.csv"
payments_path = "data/data-payment_lookup-csv.csv"

Exploratory Data Analysis

Visualizing the trips data schema

In [3]:
# Spark will read and stack each file of the trips data

trips_data = spark.read.json(trips_path)
trips_data.printSchema()
root
 |-- dropoff_datetime: string (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: long (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- vendor_id: string (nullable = true)

In [4]:
# Caching the largest Spark DF to speed up the following computations

trips = trips_data.cache()
print('Number of rows: ', trips.count())
Number of rows:  4000000

Vendors and payments datasets

In [5]:
# The vendors and payments files are quite small, so we can check them with pandas before converting to Spark DF

vendors = pd.read_csv(vendor_path)
payments = pd.read_csv(payments_path, skiprows=1)

vendors.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 9 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   vendor_id  5 non-null      object
 1   name       5 non-null      object
 2   address    5 non-null      object
 3   city       5 non-null      object
 4   state      5 non-null      object
 5   zip        5 non-null      int64 
 6   country    5 non-null      object
 7   contact    5 non-null      object
 8   current    5 non-null      object
dtypes: int64(1), object(8)
memory usage: 488.0+ bytes
In [6]:
payments.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3288 entries, 0 to 3287
Data columns (total 2 columns):
 #   Column          Non-Null Count  Dtype 
---  ------          --------------  ----- 
 0   payment_type    3288 non-null   object
 1   payment_lookup  3288 non-null   object
dtypes: object(2)
memory usage: 51.5+ KB

We have no missing values in payments and vendors. However, when we inspect the payments table more closely, we see a strange behavior.

In [7]:
payments.tail()
Out[7]:
payment_type payment_lookup
3283 3267 Foo
3284 3268 Foo
3285 3269 Foo
3286 3270 Foo
3287 3271 Foo
In [8]:
payments['payment_lookup'].value_counts()
Out[8]:
Foo          3271
Cash            5
Credit          5
Dispute         3
No Charge       3
Unknown         1
Name: payment_lookup, dtype: int64

Apparently there is a lot of noise with no meaning in this table. So, it is better to remove it.

In [9]:
payments = payments[payments['payment_lookup'] != 'Foo']
In [10]:
# Transforming payments and vendors pandas DF to Spark DF

vendors = spark.createDataFrame(vendors)
payments = spark.createDataFrame(payments)

Describing the trips data

In [11]:
# Checking for missing values in the trips data

trips.describe().toPandas()
Out[11]:
summary dropoff_datetime dropoff_latitude dropoff_longitude fare_amount passenger_count payment_type pickup_datetime pickup_latitude pickup_longitude rate_code store_and_fwd_flag surcharge tip_amount tolls_amount total_amount trip_distance vendor_id
0 count 4000000 4000000 4000000 4000000 4000000 4000000 4000000 4000000 4000000 0 248 4000000 4000000 4000000 4000000 4000000 4000000
1 mean None 40.13499827750944 -72.8480442019709 9.629420790000427 1.733573 None None 40.12472651785755 -72.83059548522309 None 0.0 0.1619385 0.40854002000000234 0.12359449000001618 10.32686640000022 2.6782093029999956 None
2 stddev None 4.986513186631796 9.050543165477416 7.567478378872196 1.254535789499849 None None 5.026361951567406 9.12239490789848 None 0.0 0.3083548215747962 1.2586321423263833 0.7616635980368894 8.50320798047934 3.13060131927324 None
3 min 2009-01-04T00:06:54.716509+00:00 -0.015202 -79.191394 2.5 0 CASH 2009-01-04T00:00:16.442478+00:00 -0.009262 -84.878351 None 0 0.0 0.0 0.0 2.5 0.0 CMT
4 max 2012-10-28T00:11:44.613549+00:00 47.935812 0.008668 200.0 6 No Charge 2012-10-27T23:59:05.26282+00:00 47.922847 0.004023 None 0 1.0 98.23 20.0 230.0 49.92 VTS

Two columns stand out here: rate_code and store_and_fwd_flag. The first one has no values at all, while the second one has a lot of missing values. Therefore, it is better to drop those columns because it will not add any value to the analysis. The other ones are fine.

In [12]:
trips = trips.drop("rate_code", "store_and_fwd_flag")

numeric_columns = ['fare_amount', 'passenger_count', 'surcharge', 
                   'tip_amount', 'tolls_amount', 'total_amount', 'trip_distance']

trips_numeric = trips.select(numeric_columns).toPandas()

categorical_variables = trips_numeric[['passenger_count', 'surcharge']]
continuous_variables = trips_numeric.drop(['passenger_count', 'surcharge'], axis=1)

Taking a look at the categorical variables

Checking the number of passengers variable

In [13]:
plt.figure(figsize=(15,8))

passengers = categorical_variables['passenger_count'].value_counts(normalize=True)

sns.barplot(x=passengers.index, y=passengers.values, color='royalblue')
plt.title('Passengers per ride', fontsize=15)
plt.ylabel('% of total rides', labelpad=10)

plt.show()
In [14]:
print('Percent of trips with no passengers: ', passengers[0]*100, '%')
Percent of trips with no passengers:  0.0111 %
In [15]:
print('Percent of trips with six passengers: ', passengers[6]*100, '%')
Percent of trips with six passengers:  0.39709999999999995 %

Apparentely there is a few trips with no passengers at all (0,01% of total trips). Assuming that a trip with no passengers does not makes sense, these occurances could be explained by data input errors or typos. So, it is better to consider it as noise and remove it. Besides that, we see that the vast majority of trips (more than 60%) happens with only one passenger. Curiously, the number of trips with five passengers is greater than the trips with three or four passengers. Also, there are some trips (0.4%) with six passengers.

Doing a short research on the theme, I discovered that are two types of taxis in NYC: one that can fit up to 4 passengers (the traditional yellow cab) and another one that looks more like a mini van that can fit up to 5 passengers. So, probably all the five passengers rides occur in a different type of vehicle. Also, a six passenger ride is not allowed by law, unless the sixth passenger is a child riding on the leap of an adult. So, although rare, those could be legit rides.

Checking the surcharge variable

In [16]:
plt.figure(figsize=(15,8))

surcharge = categorical_variables['surcharge'].value_counts(normalize=True)

sns.barplot(x=surcharge.index, y=surcharge.values, color='royalblue')
plt.title('Surcharges per ride', fontsize=15)
plt.ylabel('% of total rides', labelpad=10)

plt.show()

There is only three options for surcharges: none (0), 0.5 and 1.0. I assume those are in dollars. The majority of rides (70%) takes place with no surcharges at all.

Taking a look at the continuous variables

In [17]:
fig, axes = plt.subplots(3, 2, figsize=(20, 22))

# Fare amount
sns.distplot(continuous_variables['fare_amount'], fit=norm, kde=False, ax=axes[0,0])
mu, sigma = norm.fit(continuous_variables['fare_amount'])
axes[0,0].set_title('fare_amount distribution', fontsize=15)
axes[0,0].legend(['Normal dist. ($\mu = $ {:.2f} ; $\sigma = $ {:.2f})'.format(mu, sigma)], loc='best', 
                 fontsize=12)


# Tip amount
sns.distplot(continuous_variables['tip_amount'], fit=norm, kde=False, ax=axes[0,1])
mu, sigma = norm.fit(continuous_variables['tip_amount'])
axes[0,1].set_title('tip_amount distribution', fontsize=15)
axes[0,1].legend(['Normal dist. ($\mu = $ {:.2f} ; $\sigma = $ {:.2f})'.format(mu, sigma)], loc='best', 
                 fontsize=12)


# Tolls amount
sns.distplot(continuous_variables['tolls_amount'], fit=norm, kde=False, ax=axes[1,0])
mu, sigma = norm.fit(continuous_variables['tolls_amount'])
axes[1,0].set_title('tolls_amount distribution', fontsize=15)
axes[1,0].legend(['Normal dist. ($\mu = $ {:.2f} ; $\sigma = $ {:.2f})'.format(mu, sigma)], loc='best', 
                 fontsize=12)


# Total amount
sns.distplot(continuous_variables['total_amount'], fit=norm, kde=False, ax=axes[1,1])
mu, sigma = norm.fit(continuous_variables['total_amount'])
axes[1,1].set_title('total_amount distribution', fontsize=15)
axes[1,1].legend(['Normal dist. ($\mu = $ {:.2f} ; $\sigma = $ {:.2f})'.format(mu, sigma)], loc='best', 
                 fontsize=12)


# Trip distance
sns.distplot(continuous_variables['trip_distance'], fit=norm, kde=False, ax=axes[2,0])
mu, sigma = norm.fit(continuous_variables['trip_distance'])
axes[2,0].set_title('trip_distance distribution', fontsize=15)
axes[2,0].legend(['Normal dist. ($\mu = $ {:.2f} ; $\sigma = $ {:.2f})'.format(mu, sigma)], loc='best', 
                 fontsize=12)


plt.show()

The distributions are all right skewed, meaning that the mean is bigger than the median. Also, it is possible to see that there is a considerable number of outliers in the variables. If used as features to a machine learning model, for instance, these outliers have to be removed. Also, it might be necessary to transform the data (normalize, scale, log-transform, etc.) before feeding it to a model.

Checking for correlations

In [18]:
# Correlations

plt.figure(figsize=(15,8))

trips_corr = trips_numeric.corr()

sns.heatmap(trips_corr, cmap='Blues', annot=True, linewidths=.5, fmt='.2f')
plt.title('Correlations matrix', fontsize=20, pad=20)

plt.show()

The trip distance is strongly correlated with the total amount and with the fare amount, indicating that the distance is the most relevant factor in a ride's price. No surprises here. There is no strong negative correlations in the data. Futhermore, the tip amount and tolls amount are weakly but positive correlated with the fare and total amount. By analyzing the Pearson's R correlation matrix, it is possible to infer that the price of a ride is heavily influenced by the distance but also by the tips and tolls applied to it.

In [19]:
plt.figure(figsize=(15,8))

sns.scatterplot(x=continuous_variables['trip_distance'], y=continuous_variables['fare_amount'], alpha=0.7)
plt.title('Fare amount based on trip distance', fontsize=15, pad=20)

plt.show()

Here we see that a lot of points follow the major linear trend. However, we see some trips with zero distance and positive values of fare amount. This is probably due to an error when computing the trip's distance, because this behavior makes no sense. On the other hand, we have some trips that have positive distances values but no charges! By looking at the payments type, we see that there is a category named "No Charges", that may explain this behavior.

It is possible to see another anomaly around the fare amount of 48, with a lot of trip's distances being assigned to this value.

When trying to build a model to predict the fare amount, it would be beneficial to exclude those points from the training set, as they do not represent the general trend.

In [20]:
plt.figure(figsize=(15,8))

sns.scatterplot(x=continuous_variables['fare_amount'], y=continuous_variables['total_amount'], alpha=0.7)
plt.title('Total amount based on fare amount', fontsize=15, pad=20)

plt.show()

Here we see a clearly linear correlation between the two variables, as expected. However, some trips have a total amount slightly bigger than the fare amount, and this can be explained by additional tolls being applied. However, some points are way off the trend, and may represent outliers as well.

Minimum requirements

OBS: When answering these questions, I decided to not remove the outliers that I pointed in the exploratory analysis section. I preferred to let these answers reflect the raw dataset, since removing outliers is a strictly necessary step only when building a predictive model.

1. What is the average distance traveled by trips with a maximum of 2 passengers?

In [21]:
trips.createOrReplaceTempView("trips")

avg_distance_query = """
SELECT AVG(trip_distance) as avg
FROM trips 
WHERE passenger_count <= 2
"""

avg_distance = spark.sql(avg_distance_query).toPandas()


fig = go.Figure(go.Indicator(
    title = {'text': "Average distance for trips with maximum of 2 passengers"},
    mode = "number",
    value = avg_distance.avg[0],
    number = {'suffix': " miles"},
    domain = {'x': [0, 1], 'y': [0, 1]}))

fig.show()

2. Which are the 3 biggest vendors based on the total amount of money raised?

In [22]:
vendors.createOrReplaceTempView("vendors")

vendors_query = """
SELECT vendors.name as vendor,
       SUM(trips.total_amount) as money_raised
FROM trips
JOIN vendors using(vendor_id)
GROUP BY vendors.name 
ORDER BY money_raised DESC
LIMIT 3
"""

biggest_vendors = spark.sql(vendors_query).toPandas()


fig = go.Figure(go.Bar(x=biggest_vendors.money_raised, y=biggest_vendors.vendor, orientation='h'))
fig.update_layout(title={'text': '<b>Three biggest vendors</b>', 'x': 0.5, 'xanchor': 'center'},
                  xaxis_title='Money raised')
fig['layout']['yaxis']['autorange'] = "reversed"
fig.show()

3. Make a histogram of the monthly distribution over 4 years of rides paid with cash

In [28]:
payments.createOrReplaceTempView("payments")

trips_cash_query = """
SELECT DATE_TRUNC('MONTH', trips.dropoff_datetime) as month,                                                                   
       COUNT(*) as trips_count
FROM trips
JOIN payments using(payment_type)
WHERE payments.payment_lookup = 'Cash'
GROUP BY month
ORDER BY trips_count
"""

histogram = spark.sql(trips_cash_query).toPandas()

fig = go.Figure(go.Histogram(x=histogram.trips_count))

fig.update_layout(title={'text': '<b>Distribution of monthly rides (2009 - 2012)</b>', 
                         'x': 0.5, 'xanchor': 'center'},
                  yaxis_title='Count', xaxis_title='Amount of rides per month', bargap=0.1)

fig.show()

4. Make a time series chart computing the number of tips each day for the last 3 months of 2012

For this one I'am considering that a tip existis if its value is greater than 0, since a tip of amount 0 is equivalent to no tip at all.

In [29]:
tips_query = """
SELECT DATE_TRUNC('DAY', dropoff_datetime) as day, 
       COUNT(*) as number_of_tips
FROM trips 
WHERE extract(YEAR from dropoff_datetime) = 2012 
  AND DATE_TRUNC('DAY', dropoff_datetime) >= '2012-10-01'
  AND tip_amount > 0
GROUP BY day
ORDER BY day
"""

tips = spark.sql(tips_query).toPandas()


fig = go.Figure(data=go.Scatter(x=tips.day, y=tips.number_of_tips))
fig.update_layout(title={'text': '<b>Number of tips per day (Oct, Nov and Dec. 2012)</b>', 
                         'x': 0.5, 'xanchor': 'center'},
                 yaxis_title='Number of tips given')

fig.show()

October 27th is the last day on the dataset. Therefore, there is no more data to be displayed after this date.

Bonus items

What is the average trip time on Saturdays and Sundays

In [30]:
# First we create a column in the Spark DF that represents the trip duration in minutes

trips = trips.withColumn("pickup_timestamp", to_timestamp(col("pickup_datetime"))) \
             .withColumn("dropoff_timestamp", to_timestamp(col("dropoff_datetime"))) \
             .withColumn("trip_duration", (col("dropoff_timestamp").cast(LongType()) \
                                           - col("pickup_timestamp").cast(LongType())) / 60)

trips.createOrReplaceTempView("trips")

trip_duration_query = """
SELECT AVG(trip_duration) as avg
FROM trips 
WHERE WEEKDAY(dropoff_datetime) IN (5, 6) -- 5: Saturdays and 6: Sundays
"""

trip_duration = spark.sql(trip_duration_query).toPandas()

fig = go.Figure(go.Indicator(title = {'text': "Average duration for weekend trips"}, mode = "number",
                             value = trip_duration.avg[0], number = {'suffix': " minutes"}, 
                             domain = {'x': [0, 1], 'y': [0, 1]}))

fig.show()

Analyse the data to find and prove seasonality

The data of all 4 years will be analyzied in order to find seasonality within the year. The goal is to find if the numbers of rides within a year follow more a seasonal or trend behavior. For that, the data will be aggregated by the day of the year (1 to 365), counting the number of rides between 2009 and 2012.

In [33]:
# Checking seasonality by counting the number of rides daily

trips = trips.withColumn("day_of_year", date_format(col("pickup_datetime"), "D").cast(IntegerType()))

trips.createOrReplaceTempView("trips")

daily_rides_query = """
SELECT day_of_year as day,
       COUNT(*) as number_of_rides
FROM trips
GROUP BY day
ORDER BY day
"""

daily_rides = spark.sql(daily_rides_query).toPandas()

fig = go.Figure(data=go.Scatter(x=daily_rides.day, y=daily_rides.number_of_rides))
fig.update_layout(title={'text': '<b>Number of rides per day of the year</b>', 
                         'x': 0.5, 'xanchor': 'center'},
                 yaxis_title='Number of rides', xaxis_title='Day of the year')

fig.show()

We clearly see a problem here. Due to the lack of data in the early days of a year, the data until the 10th day is not really trustable. Also, the lack of data after october 2012 caused the numbers of rides to drastically drop. Therefore, to compute a more reliable analysis with the data we have, I will exclude the days prior to the 10th day and also the days after the 300th day.

The goals is to find a trend or a season pattern along the year.

In [34]:
daily_rides_clean = daily_rides[(daily_rides['day'] >= 10) & (daily_rides['day'] <= 300)]

Now, plotting again

In [35]:
fig = go.Figure(data=go.Scatter(x=daily_rides_clean.day, y=daily_rides_clean.number_of_rides))
fig.update_layout(title={'text': '<b>Number of rides per day (10th - 300th day)</b>', 
                         'x': 0.5, 'xanchor': 'center'},
                 yaxis_title='Number of rides')

fig.show()

Now it looks like a stationary series. We will decompose it to take a more closely look on its components.

In [36]:
from pylab import rcParams
rcParams['figure.figsize'] = 14, 10

decomposed = seasonal_decompose(daily_rides_clean.number_of_rides, model='additive', period=30)

fig = decomposed.plot()

plt.show()

We can see that there is not a clear trend moving up or down throughout the year. The maximum amplitude of the trend factor is around 60, less than the amplitude of the seasonal component (150). It is possible to see that there is a strong seasonal component in the data, with the residuals being evenly distributed around the mean of zero, indicating homoscedasticity.

It is possible to use the Dickey-Fuller test for stationarity to check if this series could be considered stationary or not. A stationary time series is one whose statistical properties such as mean, variance, etc. are constant over time. The test is trying to reject the null hypothesis that a unit root exists and the data is non-stationary. If the null hypothesis is rejected (p-value < level of significance), then the alternative hypothesis can be considered valid, in other words, that the data is stationary.

In [37]:
from statsmodels.tsa.stattools import adfuller

test = adfuller(daily_rides_clean.number_of_rides, autolag='AIC')

print('p-value: ', test[1])
p-value:  2.103052508779596e-30

Therefore, we can reject the null hypothesis at a significance level of 95% and accept that this data is stationary or, in other words, that there is a strong seasonal component in the data.

Create assumptions, validate against a data and prove with storyelling and graphs

I decided to begin to explore the data geographically. My question is: which are the top 5 pickup locations in numbers of pickups? The same question is valid for the dropoff locations. My intetion is to investigate if there is specific points where people tend to "leave" more, and also points where they tend to "arrive" more.

By doing this invesgation and cross-validating it with actual geographic data, I hope to find interesting locations.

Let's first explore the coordinates data

In [38]:
trips.createOrReplaceTempView("trips")

coordinates_amount_query = """
SELECT pickup_latitude,
       pickup_longitude,
       dropoff_latitude,
       dropoff_longitude,
       total_amount as total
FROM trips
"""

coordinates_amount = spark.sql(coordinates_amount_query).toPandas()

fig, axes = plt.subplots(2, 2, figsize=(15,10))

sns.distplot(coordinates_amount['pickup_latitude'], ax=axes[0,0])
sns.distplot(coordinates_amount['pickup_longitude'], ax=axes[0,1])
sns.distplot(coordinates_amount['dropoff_latitude'], ax=axes[1,0])
sns.distplot(coordinates_amount['dropoff_longitude'], ax=axes[1,1])

plt.show()

There are values of latitude and longitude way off the expected. Since we are dealing with New York, it is better to filter out those coordinate values that does not correspond to the New York City area.

After doing a small research, I found that the "central" coordinate of NYC is (40.7141667, -74.0063889).

It is possible to define a bounding area of the city by pairs of min and max values of coordinates. Therefore, we can say that the area of NYC is comprised in the following bounding area:

  • min_longitude = -74.3
  • max_longitude = -73.7
  • min_latitude = 40.5
  • max_latitude = 40.9

So, we can define a function to filter out the points based on this area

In [39]:
def filter_within_area(df):
    """Filters the raw coordinates to the defined bounding area.
    
    Args:
        df (DataFrame): raw coordinates to be filtered.
    
    Returns:
        DataFrame: only coordinates within the bouding area.
        
    """
    filtered_df = df[
        (df.pickup_latitude >= 40.5) & \
        (df.pickup_latitude <= 40.9) & \
        (df.dropoff_latitude >= 40.5) & \
        (df.dropoff_latitude <= 40.9) & \
        (df.pickup_longitude >= -74.3) & \
        (df.pickup_longitude <= -73.7) & \
        (df.dropoff_longitude >= -74.3) & \
        (df.dropoff_longitude <= -73.7)
    ]
    
    return filtered_df
In [40]:
filtered_coordinates = filter_within_area(coordinates_amount)

Plotting again...

In [41]:
fig, axes = plt.subplots(2, 2, figsize=(15,10))

sns.distplot(filtered_coordinates['pickup_latitude'], ax=axes[0,0])
sns.distplot(filtered_coordinates['pickup_longitude'], ax=axes[0,1])
sns.distplot(filtered_coordinates['dropoff_latitude'], ax=axes[1,0])
sns.distplot(filtered_coordinates['dropoff_longitude'], ax=axes[1,1])

plt.show()

Much better!

Now we can isolate pickups and dropoffs to evaluate which are the top locations

In [42]:
pickups = filtered_coordinates[["pickup_latitude", "pickup_longitude", "total"]]

dropoffs = filtered_coordinates[["dropoff_latitude", "dropoff_longitude", "total"]]
In [43]:
top_5_pickups = pickups.groupby(["pickup_latitude", "pickup_longitude"]). \
                        count().sort_values(by='total', ascending=False)[:5]

top_5_dropoffs = dropoffs.groupby(["dropoff_latitude", "dropoff_longitude"]). \
                        count().sort_values(by='total', ascending=False)[:5]

top_5_pickups = top_5_pickups.reset_index().drop('total', axis=1)
top_5_dropoffs = top_5_dropoffs.reset_index().drop('total', axis=1)

Top 5 pickup locations

In [44]:
top_5_pickups
Out[44]:
pickup_latitude pickup_longitude
0 40.733697 -73.951818
1 40.758087 -73.989054
2 40.733760 -73.952003
3 40.698458 -73.988537
4 40.766275 -73.981893

Top 5 dropoff locations

In [45]:
top_5_dropoffs
Out[45]:
dropoff_latitude dropoff_longitude
0 40.733697 -73.951818
1 40.758087 -73.989054
2 40.733760 -73.952003
3 40.698458 -73.988537
4 40.766275 -73.981893

And... they are actually the same places! This result indicates that a lot of rides starts and ends at the exact same places. These points are possibly within a high traffic area. Probably these are the best places to be if you are a cab driver in New York! :D

Let's plot those points over the NYC map and see if gain some insights.

In [46]:
fig = go.Figure()

fig.add_trace(go.Scattermapbox(
    mode = "markers",
    lon = top_5_pickups['pickup_longitude'],
    lat = top_5_pickups['pickup_latitude'],
    marker = {'size': 20, 'opacity': 0.8}))


fig.update_layout(
    title={'text': '<b>Top 5 pickup and dropoff points</b>', 'x': 0.5, 'xanchor': 'center'},
    margin ={'l':0,'t':50,'b':0,'r':0},
    mapbox = {
        'center': {'lon': -74, 'lat': 40.73},
        'style': "open-street-map",
        'zoom': 10})

fig.show()

This actually make a lot of sense. Two points in manhattan, one in Brooklyn and two in Queens! These are, in fact, areas in NYC with a lot of traffic. The points in queens are very close to each other. A very strategic location if you are a cab driver waiting for passengers around Queens!

Find what the fare amount (inclusive of tolls) for a taxi ride in New York City given the pickup and dropoff locations

For this one, I will train a simple model that predicts the total amount (fare amount + tolls) of a cab ride and receives as features the pickup and dropoff locations.

The model will be trained and evaluated using PySpark MLlib. First, we need to split the data into training and test. The test set will be the holdout, used only for evaluation.

In [47]:
data = spark.createDataFrame(filtered_coordinates)

# 20% held for testing
(training, test) = data.randomSplit([0.8, 0.2])

We need to clean up the training data a little bit and also perform some feature engineering. It is possible to remove records that have pickup and dropoff on the same location because these rides with distance 0 do not represent the major trend. Also, it is better to remove all the records where the total amount is bellow 2.50 dollars, because that was the minimum fare charged by NYC cabs is those years. Although a total amount below 2.50 could be legit, it certainly does not represent the trend that we are interest in modelling.

In [48]:
training = training.filter((training.pickup_latitude != training.dropoff_latitude) \
                         & (training.pickup_longitude != training.dropoff_longitude)) \
                   .filter(training.total > 2.50)

Next, we will perform some feature engineering to calculate the euclidian distance between the two points. Note that this distance may be different from the "trip_distance" original feature, because it represents the euclidian distance between two points and not a GPS distance (that can be affected by the path taken).

The euclidian distance is actually a L2 norm and, for a 2D plane, it can be defined as follows:

\begin{equation*} d(x,y) = {\left( \sum_{k=1}^n {\left |x_k - y_k \right |}^2 \right )}^{1/2} \end{equation*}

By feeding this additional feature into the model its perform may increase.

In [50]:
def calculate_distance(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude):
    """Calculate euclidian distance based on pickup and dropoff coordinates.
    
    Args:
        pickup_latitude (float): The latitude value of the pickup
        pickup_longitude (float): The longitude value of the pickup
        dropoff_latitude (float): The latitude value of the dropoff
        dropoff_longitude (float): The longitude value of the dropoff
    
    Returns:
        float: L2 norm (euclidian distance) of the two points
        
    """
    pickup = np.array((pickup_latitude, pickup_longitude))
    dropoff = np.array((dropoff_latitude, dropoff_longitude))
    
    return float(np.linalg.norm(pickup-dropoff))

udf_calculate_distance = udf(calculate_distance, FloatType())

# Creating the new feature for both the training and test sets

training = training.withColumn('distance', 
                    udf_calculate_distance(training.pickup_latitude, training.pickup_longitude, 
                                           training.dropoff_latitude, training.dropoff_longitude))

test = test.withColumn('distance', 
                        udf_calculate_distance(test.pickup_latitude, test.pickup_longitude, 
                                               test.dropoff_latitude, test.dropoff_longitude))

Let's take a look at the training data using the new feature we created: distance

In [51]:
training_pd = training.select("distance", "total").toPandas()

plt.figure(figsize=(15,8))

sns.scatterplot(x=training_pd['distance'], y=training_pd['total'], alpha=0.7)
plt.title('Total amount of rides and trip distance (training set)', fontsize=15, pad=20)

plt.show()

It is better if we remove trips with total value > 100, because they can be considered as outliers. So, we proceed to one more step of cleaning

In [52]:
training = training.filter(training.total < 100)

Now we need to scale our data before training the model and also perform MLlib specific steps like vector assemble.

In [53]:
features_col = ["pickup_latitude", "pickup_longitude", "dropoff_latitude", "dropoff_longitude", "distance"]

assembler = VectorAssembler(inputCols=features_col, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

pipeline = Pipeline(stages = [assembler, scaler])
pipelineModel = pipeline.fit(training)

scaled_training = pipelineModel.transform(training)

# It is very important to not fit the scaler again on the test data! Just transform it
scaled_test = pipelineModel.transform(test)

scaled_test = scaled_test.select("scaledFeatures", "total")
scaled_training = scaled_training.select("scaledFeatures", "total")

Implementing Linear Regression Model

As can be seen by the plot of distance vs total amount, the variables have a pretty linear relationship. So, the first try will be with a simples linear regression model, using default parameters.

In [54]:
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="total")

lrModel = lr.fit(scaled_training)

print('Train R²: ', lrModel.summary.r2)
Train R²:  0.8071158736772981
In [55]:
pred = lrModel.evaluate(scaled_test)

evaluation = RegressionEvaluator(predictionCol="prediction", labelCol="total")

r2 = evaluation.evaluate(pred.predictions, {evaluation.metricName: "r2"})

print('R² on the test set: ', r2)
R² on the test set:  0.7465892029553096
In [56]:
rmse = evaluation.evaluate(pred.predictions, {evaluation.metricName: "rmse"})

print('Root Mean Squared Error (RMSE) on the test set: ', rmse)
Root Mean Squared Error (RMSE) on the test set:  4.198971207985072

The results of the linear model are pretty good, because our error of the total amount estimation is about 4 dollars! Of course, in some cases the model's performance is better and in other cases worst. This is just a simple implementation with default parameters, but we certainly can improve it by tuning regularization parameters and investigating in which cases the model performance is worst.

Implementing Random Forest Regression

Let's experiment with another family of algorithms: tree-based methods!

For this one I will use the default parameters (20 estimators and max depth of 5) with no tuning at all.

In [57]:
rf = RandomForestRegressor(featuresCol="scaledFeatures", labelCol="total")
rfModel = rf.fit(scaled_training)

pred_rf = rfModel.transform(scaled_test)

r2_rf = evaluation.evaluate(pred_rf, {evaluation.metricName: "r2"})

print('R² on the test set: ', r2_rf)
R² on the test set:  0.7445639488425744
In [58]:
rmse_rf = evaluation.evaluate(pred_rf, {evaluation.metricName: "rmse"})

print('Root Mean Squared Error (RMSE) on the test set: ', rmse_rf)
Root Mean Squared Error (RMSE) on the test set:  4.2157168645391785

The performance of the random forest algorithm is about the same as the linear regression model. In general, RF works better when we have more features (because of its feature sampling methodology) and with tuned parameters! The performance of the model could be increased considerably if the hyperparameters were tuned.

The conclusion is that even with no tuning at all, we can build a simple model that predicts total amount of a ride fare with only pickup and dropoff locations as inputs. Probably we could build more robust models if more features were used.

Make a latitude and longitude map view of pickups and dropoffs in the year 2010

For this one I will pickup only the latitudes and longitudes bounded by the area I defined earlier! Also, I will limit the result for only 2000 records, because the map visualization is super heavy!

In [59]:
coordinates_query = """
SELECT pickup_latitude,
       pickup_longitude,
       dropoff_latitude,
       dropoff_longitude
FROM trips
WHERE (pickup_latitude >= 40.5 AND pickup_latitude <= 40.9)
AND (pickup_longitude >= -74.3 AND pickup_longitude <= -73.7)
AND (dropoff_latitude >= 40.5 AND dropoff_latitude <= 40.9)
AND (dropoff_longitude >= -74.3 AND dropoff_longitude <= -73.7)
LIMIT 2000
"""

coordinates = spark.sql(coordinates_query).toPandas()
In [60]:
fig = go.Figure()

fig.add_trace(go.Scattermapbox(
    mode = "markers",
    lon = coordinates['pickup_longitude'],
    lat = coordinates['pickup_latitude'],
    marker = {'size': 10, 'opacity': 0.3},
    name="Pickups"))


fig.update_layout(
    title={'text': '<b>Pickup locations 2010</b>', 'x': 0.5, 'xanchor': 'center'},
    margin ={'l':0,'t':50,'b':0,'r':0},
    mapbox = {
        'center': {'lon': -74, 'lat': 40.73},
        'style': "open-street-map",
        'zoom': 10})

fig.show()
In [61]:
fig = go.Figure()

fig.add_trace(go.Scattermapbox(
    mode = "markers",
    lon = coordinates['dropoff_longitude'],
    lat = coordinates['dropoff_latitude'],
    marker = {'size': 10, 'opacity': 0.3, 'color': 'orangered'}))


fig.update_layout(
    title={'text': '<b>Dropoff locations 2010</b>', 'x': 0.5, 'xanchor': 'center'},
    margin ={'l':0,'t':50,'b':0,'r':0},
    mapbox = {
        'center': {'lon': -74, 'lat': 40.73},
        'style': "open-street-map",
        'zoom': 10})

fig.show()

The vast majority of both pickup and dropoff locations are in comercial center Manhattan, as expected. Notable points here are the two airports: La Guardia and JFK, both with greater concentration of pickup and dropoffs occurances.

The only notable difference between pickups and dropoffs is that there is more dropoffs points heading towards the Bronx area (north of manhattan) than pickup points, in the same location. Besides that, the pattern is pretty similar.